{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<h1 align=\"center\">ArcticDB LazyDataFrame demo</h1>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/man-group/ArcticDB/master/static/ArcticDBCropped.png\" alt=\"ArcticDB Logo\" width=\"400\">\n",
    "\n",
    "\n",
    "<span style=\"font-size:1.5em;\">In this demo, we will explore the DataFrame processing options available in ArcticDB using the LazyDataFrame class. We will cover various possibilities of this API, including:</span>\n",
    "- ***Filtering***\n",
    "- ***Projections***\n",
    "- ***Groupbys and Aggregations***\n",
    "- ***Combinations of the above features***\n",
    "\n",
    "<span style=\"font-size:1.5em;\">Why perform the processing in ArcticDB?</span>\n",
    "- ***Performance boost via efficient C++ implementation that uses multi-threading***\n",
    "- ***Efficient data access - only reads the data needed***\n",
    "- ***For very large data sets some queries are possible that would not fit into memory***\n",
    "\n",
    "Note that all of the operations described here are also available using the legacy `QueryBuilder` class, but we think this API is more intuitive!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Demo setup"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Necessary packages installation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install arcticdb"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Necessary libraries imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "import random\n",
    "import arcticdb as adb\n",
    "from arcticdb.util.test import random_strings_of_length"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For this demo we will configure the LMDB file based backend.  ArcticDB achieves its high performance and scale when configured with an object store backend (e.g. S3)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "arctic = adb.Arctic(\"lmdb://arcticdb_demo\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can have an unlimited number of libraries, but we will just create one to start with."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if 'sample' not in arctic.list_libraries():\n",
    "    # library does not already exist\n",
    "    arctic.create_library('sample')\n",
    "lib = arctic.get_library('sample')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run the cell to set up preliminary variables. 100,000 unique strings is a pathological case for us, as with the default row-slicing policy there are 100,000 rows per data segment, and so each unique strings will appear around once per data segment in this column."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ten_grouping_values = random_strings_of_length(10, 10, True)\n",
    "one_hundred_thousand_grouping_values = random_strings_of_length(100_000, 10, True)\n",
    "rng = np.random.RandomState()\n",
    "\n",
    "sym_10M = \"demo_10M\"\n",
    "sym_100M = \"demo_100M\"\n",
    "sym_1B = \"demo_1B\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<span style=\"font-size:1.5em;\">Choose which symbol you want to work with</span>\n",
    "- sym_10M: symbol with 10 million rows\n",
    "- sym_100M: symbol with 100 million rows\n",
    "- sym_1B: symbol with 1 billion rows\n",
    "\n",
    "assign the symbol you want to work with to the sym variable\n",
    "- example: sym = sym_10M"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sym = sym_10M"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<span style=\"font-size:1.5em;\">Run this cell to set up the DataFrame according to the symbol name</span>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if sym==sym_10M:\n",
    "    num_rows = 10_000_000\n",
    "elif sym==sym_100M:\n",
    "    num_rows = 100_000_000\n",
    "elif sym==sym_1B:\n",
    "    num_rows = 1_000_000_000\n",
    "input_df = pd.DataFrame(\n",
    "    {\n",
    "        \"grouping_column_10\": list(random.choices(ten_grouping_values, k=num_rows)),\n",
    "        \"grouping_column_100_000\": list(random.choices(one_hundred_thousand_grouping_values, k=num_rows)),\n",
    "        \"numeric_column\": rng.rand((num_rows))\n",
    "    }\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Demo Start"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lib.write(sym, input_df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Show how the data has been sliced and written to disk."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "lib._nvs.read_index(sym)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Show the first 100 rows of data as a sample."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lib.head(sym, n=100).data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Reading"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Read the symbol without any filtering."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "lib.read(sym)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Most of the time is spent allocating Python strings in the column with 100,000 unique strings, so omitting this column is much faster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "lib.read(sym, columns=[\"grouping_column_10\", \"numeric_column\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Filtering"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that all of the values in the numeric column are between 0 and 1. This query therefore does not filter out any data. This demonstrates that doing a full table scan does not significantly impact the performance. Also note that the read call is not practically instant, as no data is read until collect is called on the LazyDataFrame."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "lazy_df = lib.read(sym, lazy=True)\n",
    "lazy_df = lazy_df[lazy_df[\"numeric_column\"] < 2.0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "lazy_df.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we are filtering down to approximately 10% of the rows in the symbol. This is faster than reading, as there are now fewer Python strings to allocate."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "lazy_df = lib.read(sym, lazy=True)\n",
    "lazy_df = lazy_df[lazy_df[\"numeric_column\"] < 0.1]\n",
    "df = lazy_df.collect().data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Projections"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Creating a new column as a funtion of existing columns and constants is approximately the same speed as a filter that doesn't reduce the amount of data displayed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "lazy_df = lib.read(sym, lazy=True)\n",
    "lazy_df[\"new_column\"] = lazy_df[\"numeric_column\"] * 2.0\n",
    "df = lazy_df.collect().data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Equivalently, use the apply method to achieve the same results."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lazy_df = lib.read(sym, lazy=True)\n",
    "lazy_df.apply(\"new_column\", lazy_df[\"numeric_column\"] * 2.0)\n",
    "lazy_df.collect().data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If using apply before the `LazyDataFrame` object has been created, the `col` function can be used as placeholders for columns names."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lazy_df = lib.read(sym, lazy=True).apply(\"new_column\", adb.col(\"numeric_column\") * 2.0)\n",
    "lazy_df.collect().data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Groupbys and Aggregations"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Grouping is again faster than just reading due to the reduced number of Python string allocations, even with the extra computation performed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "lazy_df = lib.read(sym, lazy=True)\n",
    "lazy_df.groupby(\"grouping_column_10\").agg({\"numeric_column\": \"mean\"})\n",
    "df = lazy_df.collect().data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Even grouping on a pathologically large number of unique values does not significantly reduce the performance."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "lazy_df = lib.read(sym, lazy=True)\n",
    "lazy_df.groupby(\"grouping_column_100_000\").agg({\"numeric_column\": \"mean\"})\n",
    "df = lazy_df.collect().data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Combinations"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "These operations can be arbitrarily combined in a seqential pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "lazy_df = lib.read(sym, lazy=True)\n",
    "lazy_df = lazy_df[lazy_df[\"numeric_column\"] < 0.1].apply(\"new_column\", lazy_df[\"numeric_column\"] * 2.0).groupby(\"grouping_column_10\").agg({\"numeric_column\": \"mean\", \"new_column\": \"max\"})\n",
    "df = lazy_df.collect().data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Batch Operations"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Setup two symbols\n",
    "batch_sym_1 = f'{sym}_1'\n",
    "batch_sym_2 = f'{sym}_2'\n",
    "syms = [batch_sym_1, batch_sym_2]\n",
    "lib.write(batch_sym_1, input_df)\n",
    "lib.write(batch_sym_2, input_df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`read_batch` also has a `lazy` argument, which returns a `LazyDataFrameCollection`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lazy_dfs = lib.read_batch(syms, lazy=True)\n",
    "lazy_dfs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The same processing operations can be applied to all of the symbols being read in the batch. Note in the cell output that the pipe `|` is _outside_ the list of `LazyDataFrame`s, so the `WHERE` clause is applied to all of the symbols."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lazy_dfs = lazy_dfs[lazy_dfs[\"numeric_column\"] < 0.1]\n",
    "lazy_dfs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Calling `collect()` on a `LazyDataFrameCollection` uses `read_batch` under the hood, and so is generally more performant than serialised read calls."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfs = lazy_dfs.collect()\n",
    "dfs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfs[0].data.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfs[1].data.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Separate processing operations can be applied to the individual symbols in the batch if desired."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lazy_dfs = lib.read_batch(syms, lazy=True)\n",
    "lazy_dfs = lazy_dfs.split()\n",
    "lazy_dfs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note in the cell output that the pipes `|` are now _inside_ the list of `LazyDataFrame`s, so the `PROJECT` clauses are applied to individual symbols."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lazy_dfs[0].apply(\"new_column_1\", 2 * adb.col(\"numeric_column\"))\n",
    "lazy_dfs[1].apply(\"new_column_1\", 4 * adb.col(\"numeric_column\"))\n",
    "lazy_dfs = adb.LazyDataFrameCollection(lazy_dfs)\n",
    "lazy_dfs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfs = lazy_dfs.collect()\n",
    "dfs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfs[0].data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfs[1].data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If desired, these two modes of operation can be combined in an intuitive manner."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lazy_dfs = lib.read_batch(syms, lazy=True)\n",
    "lazy_dfs = lazy_dfs[lazy_dfs[\"numeric_column\"] < 0.1]\n",
    "lazy_dfs = lazy_dfs.split()\n",
    "lazy_dfs[0].apply(\"new_column_1\", 2 * adb.col(\"numeric_column\"))\n",
    "lazy_dfs[1].apply(\"new_column_1\", 4 * adb.col(\"numeric_column\"))\n",
    "lazy_dfs = adb.LazyDataFrameCollection(lazy_dfs)\n",
    "lazy_dfs = lazy_dfs[lazy_dfs[\"new_column_1\"] < 0.1]\n",
    "lazy_dfs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfs = lazy_dfs.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfs[0].data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfs[1].data"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
